use std::{
    collections::VecDeque,
    ffi::{c_uint, c_void},
    ops::Deref,
    ptr::null_mut,
    sync::{
        atomic::{AtomicBool, AtomicU64, Ordering},
        Arc, Mutex,
    },
};

use embed_std::{errorln, infoln, thread::sleep_ms};

use crate::qbus::QBus;

const TOPIC_QDISPATCHER: &str = "qd";

pub type QdispatcherCb = Option<
    unsafe extern "C" fn(
        id: c_uint,
        payload: *const c_void,
        payload_len: c_uint,
        args: *mut c_void,
    ),
>;

#[repr(C)]
#[derive(Debug, Clone)]
struct QMsgHandler {
    id: c_uint,
    data_len: c_uint,
}

struct QEventHandler {
    event_id: c_uint,
    cb: QdispatcherCb,
    args: *mut c_void,
}

impl PartialEq<Self> for QEventHandler {
    fn eq(&self, other: &Self) -> bool {
        self.event_id == other.event_id && self.cb == other.cb
    }
}

impl Clone for QEventHandler {
    fn clone(&self) -> Self {
        Self {
            event_id: self.event_id,
            cb: self.cb,
            args: self.args,
        }
    }
}

unsafe impl Send for QEventHandler {}
unsafe impl Sync for QEventHandler {}

pub trait QEventHandlerR: Send + Sync {
    fn handle(&mut self, event_id: u32, data: &[u8]);
}

struct AddQEventHandlerR {
    pub events: Vec<u32>,
    pub handler: Box<dyn QEventHandlerR>,
    pub seq: u64,
}

enum QEventHandlerRequest {
    Add(QEventHandler),
    Remove(QEventHandler),
    AddR(AddQEventHandlerR),
    RemoveR(u64),
}

pub const QDIST_EVENT_ALL:c_uint = u32::MAX;

pub struct QDispatcher(Arc<QDispatcherInner>);

impl Clone for QDispatcher {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}

impl QDispatcher {
    pub fn new(bus: QBus) -> Self {
        Self(Arc::new(QDispatcherInner::new(bus)))
    }

    pub fn start_dispatch(&self) -> embed_std::Result<()> {
        let inner = self.0.clone();
        let mut qc = self.0.bus.create_consumer();
        qc.subscribe(TOPIC_QDISPATCHER);
        embed_std::thread::Thread::new_with_name(32 * 1024, "q_disp", move || {
            infoln!(">>>");
            let mut callbacks: Vec<QEventHandler> = Vec::new();
            let mut r_callbacks: Vec<AddQEventHandlerR> = Vec::new();
            while !inner.is_quit.load(Ordering::SeqCst) {
                {
                    let mut reqs = inner.reqs.lock().unwrap();
                    while !reqs.is_empty() {
                        if let Some(req) = reqs.pop_front() {
                            match req {
                                QEventHandlerRequest::Add(cb) => {
                                    if callbacks.iter().find(|v| *v == &cb).is_none() {
                                        callbacks.push(cb);
                                    }
                                    infoln!("after add callbacks size {}", callbacks.len());
                                }
                                QEventHandlerRequest::Remove(cb) => {
                                    callbacks.retain(|v| v != &cb);
                                    infoln!("after remove callbacks size {}", callbacks.len());
                                }
                                QEventHandlerRequest::AddR(cb) => {
                                    infoln!("add handler {}", cb.seq);
                                    r_callbacks.push(cb);
                                    infoln!("after add rust callbacks size {}", r_callbacks.len());
                                }
                                QEventHandlerRequest::RemoveR(seq) => {
                                    infoln!("remove token {}", seq);
                                    r_callbacks.retain(|v| v.seq != seq);
                                    infoln!(
                                        "after remove rust callbacks size {}",
                                        r_callbacks.len()
                                    );
                                }
                            }
                        }
                    }
                }

                if let Some(msg) = qc.read() {
                    let header_size = core::mem::size_of::<QMsgHandler>();
                    if msg.payload.len() < header_size {
                        errorln!("topic {} invalid payload", msg.topic);
                        continue;
                    }
                    let header = unsafe { &*(msg.payload.as_ptr() as *const QMsgHandler) };
                    if msg.payload.len() < header_size + header.data_len as usize {
                        errorln!("topic {} invalid payload", msg.topic);
                        continue;
                    }
                    let data = &msg.payload.as_slice()[header_size..];
                    for handler in r_callbacks.iter_mut() {
                        if !handler.events.contains(&QDIST_EVENT_ALL) && !handler.events.contains(&header.id) {
                            continue;
                        }
                        handler.handler.handle(header.id, data);
                    }
                    for handler in callbacks.iter() {
                        if handler.event_id != QDIST_EVENT_ALL && handler.event_id != header.id {
                            continue;
                        }
                        if let Some(cb) = handler.cb {
                            unsafe {
                                cb(
                                    header.id,
                                    data.as_ptr() as *const c_void,
                                    data.len() as c_uint,
                                    handler.args,
                                )
                            };
                        }
                    }
                    continue;
                }
                sleep_ms(10);
            }
            qc.unsubscribe(TOPIC_QDISPATCHER);
            infoln!("<<<");
        })?;
        Ok(())
    }

    pub fn stop_dispatch(&self) {
        self.0.is_quit.store(true, Ordering::SeqCst);
    }
}

impl Deref for QDispatcher {
    type Target = QDispatcherInner;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl Drop for QDispatcher {
    fn drop(&mut self) {
        let count = Arc::strong_count(&self.0);
        if count == 2 {
            // self and thread
            self.stop_dispatch();
        }
        infoln!("drop QDispatcher {}", count);
    }
}

pub struct QDispatcherInner {
    is_quit: AtomicBool,
    reqs: Mutex<VecDeque<QEventHandlerRequest>>,
    bus: QBus,
    seq: AtomicU64,
}

impl Drop for QDispatcherInner {
    fn drop(&mut self) {
        infoln!("drop QDispatcherInner");
    }
}

impl QDispatcherInner {
    pub fn new(bus: QBus) -> Self {
        Self {
            is_quit: AtomicBool::new(false),
            reqs: Mutex::new(VecDeque::new()),
            bus,
            seq: AtomicU64::new(0),
        }
    }

    pub fn add_listener(&self, id: c_uint, cb: QdispatcherCb, args: *mut c_void) {
        self.add_request(QEventHandlerRequest::Add(QEventHandler {
            event_id: id,
            cb,
            args,
        }));
    }

    pub fn remove_listener(&self, id: c_uint, cb: QdispatcherCb) {
        self.add_request(QEventHandlerRequest::Remove(QEventHandler {
            event_id: id,
            cb,
            args: null_mut(),
        }));
    }

    pub fn add_listener_r(
        &self,
        events: &[u32],
        handler: Box<dyn QEventHandlerR>,
    ) -> embed_std::Result<u64> {
        let seq = self.seq.fetch_add(1, Ordering::SeqCst);
        self.add_request(QEventHandlerRequest::AddR(AddQEventHandlerR {
            events: Vec::from(events),
            handler,
            seq,
        }));
        Ok(seq)
    }

    pub fn remove_listener_r(&self, token: u64) {
        self.add_request(QEventHandlerRequest::RemoveR(token));
    }

    fn add_request(&self, req: QEventHandlerRequest) {
        let mut reqs = self.reqs.lock().unwrap();
        reqs.push_back(req);
    }

    pub fn publish(&self, id: c_uint, data: &[u8]) -> embed_std::Result<()> {
        let mut d = Vec::new();
        let header = QMsgHandler {
            id,
            data_len: data.len() as c_uint,
        };
        let hs = unsafe {
            core::slice::from_raw_parts(
                &header as *const QMsgHandler as *const u8,
                core::mem::size_of::<QMsgHandler>(),
            )
        };
        d.extend_from_slice(hs);
        if data.len() > 0 {
            d.extend_from_slice(data);
        }
        self.bus.publish(TOPIC_QDISPATCHER, d.as_slice())?;
        Ok(())
    }
}
